package com.scau.jansing.thread.first.queue;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 *
 *
 * 无容量上限
 * Created by jansing on 2016/1/1.
 */
public class LearnPriorityBlockingQueue {
    public static void main(String[] args) {
        Random random = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
        exec.execute(new PrioritizedTaskProducer(queue, exec));
        exec.execute(new PrioritizedTaskConsumer(queue));
    }

}

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>{
    private Random random = new Random(47);
    private static int counter = 0;
    private final int id = counter++;
    private final int priority;
    protected static List<PrioritizedTask> sequence = new ArrayList<>();

    PrioritizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }
    @Override
    public String toString(){
        return String.format("[%1$-3d]", priority) + " Task " + id;
    }
    public String summary(){
        return "(" + id + ":" + priority + ")";
    }


    @Override
    public void run() {
        try{
            System.out.println(this);
            TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public int compareTo(PrioritizedTask o) {
        return priority < o.priority ? 1:
                (priority > o.priority ? -1 : 0);
    }

//    最后一个，输出（不是DelayQueue）队列信息并执行exec.shutdownNow
    public static class EndSentinel extends PrioritizedTask {
        private ExecutorService exec;
        EndSentinel(ExecutorService e) {
            //设置最低的优先级
            super(-1);
            exec = e;
        }

        @Override
        public void run(){
            for(PrioritizedTask pt : sequence){
                System.out.println(pt.summary());
            }
            System.out.println();
            System.out.println(this + " Calling shutdownNow");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable{
    private Random random = new Random(47);
    private Queue<Runnable> queue;
    private ExecutorService exec;
    public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e){
        queue = q;
        exec = e;
    }

    @Override
    public void run() {
        for(int i=0; i<20; i++){
            //增加20个随机优先级的，yield()实现在增加若干个后让Consumer执行，所以输出结果受优先级高低和是否创建并加入队列中因素影响
            queue.add(new PrioritizedTask(random.nextInt(10)));
            Thread.yield();
        }

        try{
            for(int i=0; i<10; i++){
                //增加10个最高优先级的
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(10));
            }
            for(int i=0; i<10; i++){
                //增加10个各级优先级的
                queue.add(new PrioritizedTask(i));
            }
            queue.add(new PrioritizedTask.EndSentinel(exec));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Finished PrioritizedTaskProducer");
    }
}

class PrioritizedTaskConsumer implements Runnable{
    private PriorityBlockingQueue<Runnable> q;
    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q){
        this.q = q;
    }

    @Override
    public void run() {
        try{
            while(!Thread.interrupted()){
                q.take().run();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Finished PrioritizedTaskConsumer");
    }
}
